-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement GROUPING aggregate function (following Postgres behavior.) #12565
Conversation
// The PhysicalExprs of grouping_exprs must be Column PhysicalExpr. Because if | ||
// the group by PhysicalExpr in SQL is non-Column PhysicalExpr, then there is | ||
// a ProjectionExec before AggregateExec to convert the non-column PhysicalExpr | ||
// to Column PhysicalExpr. | ||
let column_index = | ||
|expr: &Arc<dyn PhysicalExpr>| match expr.as_any().downcast_ref::<Column>() { | ||
Some(column) => Ok(column.index()), | ||
None => internal_err!("Grouping doesn't support expr: {}", expr), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only true when one enabled the optimizer rule CommonSubexprEliminate
. Does not seems like a acceptable to depend on optimizer rules for correctness/basic support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we look for equal PhysicalExprs?
The Postgres docs imply they do ~text comparison but I'm not sure how accessible that info is at this layer.
// GROUPING is a special fxn that exposes info about group organization | ||
if let Some(grouping) = agg_expr.fun().inner().as_any().downcast_ref::<Grouping>() { | ||
let args = agg_expr.all_expressions().args; | ||
return grouping.create_grouping_accumulator(&args, &group_by.expr); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need special handling like this it seems to me that we should consider just making Grouping
a build in.
Or we should probably make it more generic so it can be used to implement some other function. But since the input is is just the bitmaks and the output is the same. I wonder if there are any conceivable functions that could not just be implemented as a transformation on a builtin grouping function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of in a weird place, it's sort of not a real aggregation function but instead a way to leak metadata. That might be a reason to make it a built-in.
Do you have ideas about how and when to go about doing that?
There's another function called GROUP_ID (not to be confused with GROUPING_ID) which disambiguates duplicate rows, it might be relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of in a weird place, it's sort of not a real aggregation function but instead a way to leak metadata. That might be a reason to make it a built-in.
Do you have ideas about how and when to go about doing that?
One way might be that we expose the grouping_id
column used in #12571 and implement the function as transformation on that. This should be possible as that column should "leak" the needed metadata. This is what was proposed in #5749
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed an initial implemenation of this here: #12704
I think someone with more experince with this project should decide what is the best way forward.
Looks like there is a minor clippy failure on this PR |
382bb23
to
9c0ed8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bgjackma for your contribution. Great PR and the testing is awesome.
I would probably add user documentation, examples and benchmarks. But it can be also done as followup PRs
grouping_args: &[Arc<dyn PhysicalExpr>], | ||
group_exprs: &[(Arc<dyn PhysicalExpr>, String)], | ||
) -> Result<Box<dyn GroupsAccumulator>> { | ||
if grouping_args.len() > 32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets have it as a const
}; | ||
let group_by_columns: Result<Vec<_>> = | ||
group_exprs.iter().map(|(e, _)| column_index(e)).collect(); | ||
let group_by_columns = group_by_columns?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be 1 liner?
struct GroupingAccumulator { | ||
// Grouping ID value for each group | ||
grouping_ids: Vec<u32>, | ||
// Indices of GROUPING arguments as they appear in the GROUPING SET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have more details or example on indices?
} | ||
|
||
impl GroupingAccumulator { | ||
fn mask_to_id(&self, mask: &[bool]) -> Result<u32> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more description on this method, how it changes the mask
_opt_filter: Option<&BooleanArray>, | ||
total_num_groups: usize, | ||
) -> Result<()> { | ||
assert_eq!(values.len(), 1, "single argument to merge_batch"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we always expect only 1 array ?
expr_indices: vec![5], | ||
}; | ||
let res = grouping.mask_to_id(&[false]); | ||
assert!(res.is_err()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may want to check the error message as well
@@ -1169,7 +1176,7 @@ pub(crate) fn evaluate_group_by( | |||
.groups | |||
.iter() | |||
.map(|group| { | |||
group | |||
let v = group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets have more meaningful name?
) -> Result<Box<dyn GroupsAccumulator>> { | ||
// GROUPING is a special fxn that exposes info about group organization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// GROUPING is a special fxn that exposes info about group organization | |
// GROUPING is a special function that exposes info about group organization |
?
@@ -870,6 +883,7 @@ impl GroupedHashAggregateStream { | |||
| AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), | |||
} | |||
} | |||
debug!("Output: {:?}", output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug!("Output: {:?}", output); |
})?; | ||
let mut output = group_values | ||
.first() | ||
.map(|gs| gs.values.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets have better naming?
Which issue does this PR close?
Closes #5647.
Rationale for this change
Implements the GROUPING function as per Postgres.
https://www.postgresql.org/docs/15/functions-aggregate.html#FUNCTIONS-GROUPING-TABLE
This is in contrast to other implementations including Databricks and Oracle where GROUPING takes only one column and there is a GROUPING_ID function that yields a similar bitfield.
What changes are included in this PR?
Implement the aggregate function in the Physical Planning stage.
Are these changes tested?
A few unit tests and an integration test provided by @JasonLi-cn in a previous unfinished PR. May add more.
Are there any user-facing changes?